You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/02/20 21:22:15 UTC
svn commit: r509738 - in /incubator/qpid/branches/qpid.0-9/java:
broker/src/main/java/org/apache/qpid/server/
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client/handler/
client/src/main/java/org/apache/qpid/client/...
Author: kpvdr
Date: Tue Feb 20 12:22:14 2007
New Revision: 509738
URL: http://svn.apache.org/viewvc?view=rev&rev=509738
Log:
Fixed the various Ref modes so that the new MessageRefTest passes all tests.
Added:
incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/MessageRefTest.java
- copied, changed from r508424, incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionRefTest.java
Removed:
incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionRefTest.java
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Feb 20 12:22:14 2007
@@ -140,11 +140,11 @@
private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory();
private Set<Long> _browsedAcks = new HashSet<Long>();
-
+
/**
* Used in creating unique references.
*/
- private byte _refCounter;
+ private static AtomicLong _refIdCounter = new AtomicLong();
// XXX: clean up arguments
public AMQChannel(int channelId, AMQProtocolSession session, MessageStore messageStore, MessageRouter exchanges, AMQMethodListener methodListener)
@@ -290,32 +290,44 @@
public void addMessageOpen(MessageOpenBody open) throws AMQException
{
- try {
+ try
+ {
createReference(open.reference);
- } catch (IllegalArgumentException e) {
+ }
+ catch (IllegalArgumentException e)
+ {
throw open.getConnectionException(503, "Reference is already open");
}
}
public void addMessageAppend(MessageAppendBody append) throws AMQException
{
- try {
+ try
+ {
AMQReference ref = getReference(append.reference);
- ref.appendContent(ByteBuffer.wrap(append.bytes));
- } catch (IllegalArgumentException e) {
+ if (append.bytes != null) // sending an empty string results in a null
+ {
+ ref.appendContent(ByteBuffer.wrap(append.bytes));
+ }
+ }
+ catch (IllegalArgumentException e)
+ {
throw append.getConnectionException(503, "Reference is not open");
}
}
public void addMessageClose(MessageCloseBody close) throws AMQException
{
- try {
+ try
+ {
AMQReference ref = removeReference(close.reference);
for (AMQMessage msg : ref.getMessageList())
{
routeCurrentMessage(msg);
}
- } catch (IllegalArgumentException e) {
+ }
+ catch (IllegalArgumentException e)
+ {
throw close.getConnectionException(503, "Reference is not open");
}
}
@@ -392,8 +404,11 @@
_session.writeRequest(_channelId, mtb, listener);
}
- private synchronized byte[] nextRefId() {
- return new byte[]{_refCounter++};
+ private synchronized byte[] nextRefId()
+ {
+ // clumsy
+ return String.valueOf(_refIdCounter.incrementAndGet()).getBytes();
+ //return new byte[]{_refIdCounter.getAndIncrement()};
}
public void deliverRef(AMQMessage msg, AMQShortString destination, AMQMethodListener listener)
@@ -471,7 +486,7 @@
{
throw new ConsumerTagNotUniqueException();
}
-
+ acks = acks;
queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive);
_consumerTag2QueueMap.put(tag, queue);
return tag;
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Feb 20 12:22:14 2007
@@ -177,10 +177,15 @@
public AMQConnection(String broker, String username, String password,
String clientName, String virtualHost) throws AMQException, URLSyntaxException
{
+ this(broker, username, password, clientName, virtualHost, null);
+ }
+ public AMQConnection(String broker, String username, String password,
+ String clientName, String virtualHost, ConnectionTuneParameters params) throws AMQException, URLSyntaxException
+ {
this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
username + ":" + password + "@" +
(clientName == null ? "" : clientName) + "/" +
- virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"));
+ virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), params);
}
public AMQConnection(String host, int port, String username, String password,
@@ -192,6 +197,12 @@
public AMQConnection(String host, int port, boolean useSSL, String username, String password,
String clientName, String virtualHost) throws AMQException, URLSyntaxException
{
+ this(host, port, useSSL, username, password, clientName, virtualHost, null);
+ }
+
+ public AMQConnection(String host, int port, boolean useSSL, String username, String password,
+ String clientName, String virtualHost, ConnectionTuneParameters params) throws AMQException, URLSyntaxException
+ {
this(new AMQConnectionURL(useSSL ?
ConnectionURL.AMQ_PROTOCOL + "://" +
username + ":" + password + "@" +
@@ -203,16 +214,25 @@
(clientName == null ? "" : clientName) +
virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
+ "," + ConnectionURL.OPTIONS_SSL + "='false'"
- ));
+ ), params);
}
public AMQConnection(String connection) throws AMQException, URLSyntaxException
{
- this(new AMQConnectionURL(connection));
+ this(new AMQConnectionURL(connection), null);
+ }
+
+ public AMQConnection(String connection, ConnectionTuneParameters params) throws AMQException, URLSyntaxException
+ {
+ this(new AMQConnectionURL(connection), params);
}
public AMQConnection(ConnectionURL connectionURL) throws AMQException
{
+ this(connectionURL, null);
+ }
+ public AMQConnection(ConnectionURL connectionURL, ConnectionTuneParameters params) throws AMQException
+ {
_logger.info("Connection:" + connectionURL);
_ConnectionId.incrementAndGet();
if (connectionURL == null)
@@ -229,7 +249,7 @@
_failoverPolicy = new FailoverPolicy(connectionURL);
- _protocolHandler = new AMQProtocolHandler(this);
+ _protocolHandler = new AMQProtocolHandler(this, params);
// We are not currently connected
_connected = false;
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Feb 20 12:22:14 2007
@@ -1645,10 +1645,13 @@
{
if (_startedAtLeastOnce.getAndSet(true))
{
- try{
+ try
+ {
//then we stopped this and are restarting, so signal server to resume delivery
unsuspendChannel();
- }catch(AMQException e){
+ }
+ catch(AMQException e)
+ {
_logger.error("Error Un Suspending Channel", e);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Tue Feb 20 12:22:14 2007
@@ -36,6 +36,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
@@ -106,6 +107,8 @@
private final boolean _waitUntilSent;
private static final Content[] NO_CONTENT = new Content[0];
+
+ private static AtomicLong _refIdCounter;
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted,
int channelId, AMQSession session, AMQProtocolHandler protocolHandler,
@@ -126,6 +129,7 @@
_immediate = immediate;
_mandatory = mandatory;
_waitUntilSent = waitUntilSent;
+ _refIdCounter = new AtomicLong();
}
void resubscribe() throws AMQException
@@ -256,19 +260,6 @@
}
}
- public void sendRef(Message message) throws JMSException
- {
- checkPreConditions();
- checkInitialDestination();
-
-
- synchronized (_connection.getFailoverMutex())
- {
- sendImpl(_destination, message, true, _deliveryMode, _messagePriority, _timeToLive,
- _mandatory, _immediate);
- }
- }
-
public void send(Message message, int deliveryMode) throws JMSException
{
checkPreConditions();
@@ -373,6 +364,44 @@
}
}
+ // Send entire message as a ref
+ public void sendAsRef(Message message) throws JMSException
+ {
+ checkPreConditions();
+ checkInitialDestination();
+
+
+ synchronized (_connection.getFailoverMutex())
+ {
+ sendImpl(_destination, message, true, _deliveryMode, _messagePriority, _timeToLive,
+ _mandatory, _immediate);
+ }
+ }
+
+ // Test methods for sending a ref
+ public String openRef() throws JMSException
+ {
+ String referenceId = generateReferenceId();
+ doMessageOpen(referenceId);
+ return referenceId;
+ }
+
+ public void transferRef(String referenceId, MessageHeaders messageHeaders) throws JMSException
+ {
+ Content content = new Content(Content.TypeEnum.REF_T, referenceId.getBytes());
+ doMessageTransfer(messageHeaders, _destination, content, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate, false);
+ }
+
+ public void appendRef(String referenceId, byte[] content) throws JMSException
+ {
+ doMessageAppend(referenceId, content);
+ }
+
+ public void closeRef(String referenceId) throws JMSException
+ {
+ doMessageClose(referenceId);
+ }
+
private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
{
@@ -526,7 +555,7 @@
Content data = new Content(Content.TypeEnum.INLINE_T, payload);
- doMessageTransfer(messageHeaders,destination,data,message,deliveryMode,priority,timeToLive,mandatory,immediate);
+ doMessageTransfer(messageHeaders, destination, data, deliveryMode, priority, timeToLive, mandatory, immediate, message.getJMSRedelivered());
}
else
{
@@ -547,8 +576,8 @@
doMessageOpen(referenceId);
// Message.Transfer
- Content data = new Content(Content.TypeEnum.REF_T, referenceId.getBytes());
- doMessageTransfer(messageHeaders,destination,data,message,deliveryMode,priority,timeToLive,mandatory,immediate);
+ Content data = new Content(Content.TypeEnum.REF_T, referenceId.getBytes());
+ doMessageTransfer(messageHeaders, destination, data, deliveryMode, priority, timeToLive, mandatory, immediate, message.getJMSRedelivered());
//Message.Append
for(Iterator it = content.iterator(); it.hasNext();)
@@ -572,8 +601,8 @@
}
private void doMessageTransfer(MessageHeaders messageHeaders, AMQDestination destination, Content content,
- AbstractJMSMessage message, int deliveryMode, int priority,
- long timeToLive, boolean mandatory, boolean immediate) throws JMSException
+ int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate,
+ boolean redelivered) throws JMSException
{
try
{
@@ -583,7 +612,7 @@
_protocolHandler.getProtocolMinorVersion(), // AMQP minor version
messageHeaders.getAppId(), // String appId
messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
- content, // Content body
+ content, // Content body
messageHeaders.getEncoding(), // String contentEncoding
messageHeaders.getContentType(), // String contentType
messageHeaders.getCorrelationId(), // String correlationId
@@ -595,7 +624,7 @@
mandatory, // boolean mandatory
messageHeaders.getMessageId(), // String messageId
(short)priority, // short priority
- message.getJMSRedelivered(), // boolean redelivered
+ redelivered, // boolean redelivered
messageHeaders.getReplyTo(), // String replyTo
destination.getRoutingKey(), // String routingKey
new String("abc123").getBytes(), // byte[] securityToken
@@ -665,8 +694,9 @@
}
}
- private String generateReferenceId(){
- return String.valueOf(System.currentTimeMillis());
+ private String generateReferenceId()
+ {
+ return String.valueOf(_refIdCounter.incrementAndGet());
}
private void checkTemporaryDestination(AMQDestination destination) throws JMSException
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java Tue Feb 20 12:22:14 2007
@@ -59,9 +59,20 @@
params = new ConnectionTuneParameters();
}
- params.setFrameMax(frame.frameMax);
- params.setChannelMax(frame.channelMax);
- params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat));
+ // Set frame and channel max to smaller of client or broker size (if client size is set)
+ if (frame.getFrameMax() < params.getFrameMax() || params.getFrameMax() == 0)
+ {
+ params.setFrameMax(frame.getFrameMax());
+ }
+ if (frame.getChannelMax() < params.getChannelMax() || params.getChannelMax() == 0)
+ {
+ params.setChannelMax(frame.getChannelMax());
+ }
+ // Set heartbeat delay to lowest value
+ if (Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat) < params.getHeartbeat() || params.getHeartbeat() == 0)
+ {
+ params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat));
+ }
protocolSession.setConnectionTuneParameters(params);
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java Tue Feb 20 12:22:14 2007
@@ -50,11 +50,13 @@
{
protocolSession.messageAppendBodyReceived((MessageAppendBody)evt.getMethod());
+// TODO: Fix this - the MethodOks are never being sent, find a way to send them when the JMS
+// Acknowledgement mode is appropriate.
// Be aware of possible changes to parameter order as versions change.
- final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
- protocolSession.getProtocolMajorVersion(), // AMQP major version
- protocolSession.getProtocolMinorVersion()); // AMQP minor version
- protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
+// final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
+// protocolSession.getProtocolMajorVersion(), // AMQP major version
+// protocolSession.getProtocolMinorVersion()); // AMQP minor version
+// protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
}
catch (Exception e)
{
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java Tue Feb 20 12:22:14 2007
@@ -52,11 +52,13 @@
protocolSession.deliverMessageToAMQSession(evt.getChannelId(), referenceId);
_logger.debug("Method Close Body received, notify session to accept unprocessed message");
+// TODO: Fix this - the MethodOks are never being sent, find a way to send them when the JMS
+// Acknowledgement mode is appropriate.
// Be aware of possible changes to parameter order as versions change.
- final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
- protocolSession.getProtocolMajorVersion(), // AMQP major version
- protocolSession.getProtocolMinorVersion()); // AMQP minor version
- protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
+// final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
+// protocolSession.getProtocolMajorVersion(), // AMQP major version
+// protocolSession.getProtocolMinorVersion()); // AMQP minor version
+// protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java Tue Feb 20 12:22:14 2007
@@ -48,14 +48,16 @@
public void methodReceived (AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
byte[] referenceId = ((MessageOpenBody)evt.getMethod()).getReference();
- final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), referenceId);
+ final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), referenceId);
protocolSession.unprocessedMessageReceived(new String(referenceId), msg);
+// TODO: Fix this - the MethodOks are never being sent, find a way to send them when the JMS
+// Acknowledgement mode is appropriate.
// Be aware of possible changes to parameter order as versions change.
- final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
- protocolSession.getProtocolMajorVersion(), // AMQP major version
- protocolSession.getProtocolMinorVersion()); // AMQP minor version
- protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
+// final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
+// protocolSession.getProtocolMajorVersion(), // AMQP major version
+// protocolSession.getProtocolMinorVersion()); // AMQP minor version
+// protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java Tue Feb 20 12:22:14 2007
@@ -78,7 +78,7 @@
else
{
String referenceId = new String(transferBody.getBody().getContentAsByteArray());
- protocolSession.messageTransferBodyReceivedForReferenceCase(referenceId, messageHeaders,transferBody.getRedelivered());
+ protocolSession.messageTransferBodyReceivedForReferenceCase(referenceId, evt.getRequestId(), messageHeaders, transferBody.getRedelivered());
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Tue Feb 20 12:22:14 2007
@@ -41,10 +41,9 @@
private boolean redeliveredFlag;
private MessageHeaders messageHeaders;
- public UnprocessedMessage(int channelId, long deliveryTag, byte[] referenceId)
+ public UnprocessedMessage(int channelId, byte[] referenceId)
{
this.channelId = channelId;
- this.deliveryTag = deliveryTag;
this.referenceId = referenceId;
}
@@ -113,11 +112,18 @@
new String(contents.get(0));
}
- public void setMessageHeaders(MessageHeaders messageHeaders) {
+ public void setDeliveryTag(long deliveryTag)
+ {
+ this.deliveryTag = deliveryTag;
+ }
+
+ public void setMessageHeaders(MessageHeaders messageHeaders)
+ {
this.messageHeaders = messageHeaders;
}
- public void setRedeliveredFlag(boolean redeliveredFlag) {
+ public void setRedeliveredFlag(boolean redeliveredFlag)
+ {
this.redeliveredFlag = redeliveredFlag;
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Tue Feb 20 12:22:14 2007
@@ -35,6 +35,7 @@
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.ConnectionTuneParameters;
import org.apache.qpid.client.failover.FailoverHandler;
import org.apache.qpid.client.failover.FailoverState;
import org.apache.qpid.client.state.AMQState;
@@ -70,6 +71,7 @@
* mapping between connection instances and protocol handler instances.
*/
private AMQConnection _connection;
+ private ConnectionTuneParameters _params;
/**
* Used only when determining whether to add the SSL filter or not. This should be made more
@@ -104,9 +106,10 @@
private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
- public AMQProtocolHandler(AMQConnection con)
+ public AMQProtocolHandler(AMQConnection con, ConnectionTuneParameters params)
{
_connection = con;
+ _params = params;
}
public boolean isUseSSL()
@@ -156,6 +159,8 @@
}
_protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
+ if (_params != null)
+ _protocolSession.setConnectionTuneParameters(_params);
_protocolSession.init();
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Tue Feb 20 12:22:14 2007
@@ -282,8 +282,10 @@
msg.addContent(appendBody.bytes);
}
- public void messageTransferBodyReceivedForReferenceCase(String referenceId,MessageHeaders messageHeaders,boolean redilivered){
+ public void messageTransferBodyReceivedForReferenceCase(String referenceId, long deliveryTag, MessageHeaders messageHeaders, boolean redilivered)
+ {
UnprocessedMessage msg = (UnprocessedMessage)_referenceId2UnprocessedMsgMap.get(referenceId);
+ msg.setDeliveryTag(deliveryTag);
msg.setMessageHeaders(messageHeaders);
msg.setRedeliveredFlag(redilivered);
}
Copied: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/MessageRefTest.java (from r508424, incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionRefTest.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/MessageRefTest.java?view=diff&rev=509738&p1=incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionRefTest.java&r1=508424&p2=incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/MessageRefTest.java&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionRefTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/MessageRefTest.java Tue Feb 20 12:22:14 2007
@@ -26,13 +26,15 @@
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.BasicMessageProducer;
+import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.message.JMSTextMessage;
import javax.jms.*;
/**
* @author Apache Software Foundation
*/
-public class PubSubTwoConnectionRefTest extends TestCase
+public class MessageRefTest extends TestCase
{
protected void setUp() throws Exception
{
@@ -45,37 +47,226 @@
super.tearDown();
}
- /**
- * This tests that a consumer is set up synchronously
- * @throws Exception
- */
- public void testTwoConnections() throws Exception
+ public void testOneWayRef() throws Exception
{
AMQTopic topic = new AMQTopic("MyTopic");
AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test");
- AMQSession session1 = con1.createAMQSession(false, AMQSession.NO_ACKNOWLEDGE);
+ AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE);
BasicMessageProducer producer = session1.createBasicProducer(topic);
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test");
- Session session2 = con2.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session2.createConsumer(topic);
con2.start();
- producer.sendRef(session1.createTextMessage("Hello ref"));
-// producer.sendRef(session1.createTextMessage("Goodbye ref"));
+
+ producer.sendAsRef(session1.createTextMessage("Hello ref"));
TextMessage tm1 = (TextMessage) consumer.receive(2000);
assertNotNull(tm1);
assertEquals("Hello ref", tm1.getText());
-// assertEquals("Goodbye ref", tm1.getText());
+
+ con2.close();
+ con1.close();
}
- public static void main(String[] args){
- PubSubTwoConnectionRefTest test = new PubSubTwoConnectionRefTest();
- try {
- test.setUp();
- test.testTwoConnections();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ public void testOneWayRefAppend() throws Exception
+ {
+ AMQTopic topic = new AMQTopic("MyTopic");
+ AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test");
+ AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ BasicMessageProducer producer = session1.createBasicProducer(topic);
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test");
+ Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session2.createConsumer(topic);
+ con2.start();
+
+ String refId = producer.openRef();
+ producer.transferRef(refId, ((JMSTextMessage)session1.createTextMessage()).getMessageHeaders());
+ producer.appendRef(refId, new String("ABC").getBytes());
+ producer.appendRef(refId, new String("123").getBytes());
+ producer.appendRef(refId, new String("").getBytes());
+ producer.appendRef(refId, new String("DEF").getBytes());
+ producer.appendRef(refId, new String("456").getBytes());
+ producer.closeRef(refId);
+ TextMessage tm1 = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm1);
+ assertEquals("ABC123DEF456", tm1.getText());
+
+ con2.close();
+ con1.close();
+ }
+
+ public void testTwoWayRef() throws Exception
+ {
+ // Set frame size to 1000 and send message of 2500
+ ConnectionTuneParameters tp = new ConnectionTuneParameters();
+ tp.setFrameMax(1000L);
+ tp.setChannelMax(32767);
+ tp.setHeartbeat(600);
+ String message = createMessage(2500);
+
+ AMQTopic topic = new AMQTopic("MyTopic");
+ AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test", tp);
+ AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ BasicMessageProducer producer = session1.createBasicProducer(topic);
+
+ AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test", tp);
+ Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session2.createConsumer(topic);
+ con2.start();
+
+ producer.send(session1.createTextMessage(message));
+ TextMessage tm1 = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm1);
+ assertEquals(message, tm1.getText());
+
+ con2.close();
+ con1.close();
+ }
+
+ public void testUpSmallDownBig() throws Exception
+ {
+ ConnectionTuneParameters tp1 = new ConnectionTuneParameters();
+ tp1.setFrameMax(1000L);
+ tp1.setChannelMax(32767);
+ tp1.setHeartbeat(600);
+ ConnectionTuneParameters tp2 = new ConnectionTuneParameters();
+ tp2.setFrameMax(2000L);
+ tp2.setChannelMax(32767);
+ tp2.setHeartbeat(600);
+ String message = createMessage(2500);
+
+ AMQTopic topic = new AMQTopic("MyTopic");
+ AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test", tp1);
+ AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ BasicMessageProducer producer = session1.createBasicProducer(topic);
+
+ AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test", tp2);
+ Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session2.createConsumer(topic);
+ con2.start();
+
+ producer.send(session1.createTextMessage(message));
+ TextMessage tm1 = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm1);
+ assertEquals(message, tm1.getText());
+
+ con2.close();
+ con1.close();
+ }
+
+ //*** Uncomment this test when the rechunking code has been included in AMQChannel.deliver() ***
+ /* public void testUpBigDownSmall() throws Exception
+ {
+ ConnectionTuneParameters tp1 = new ConnectionTuneParameters();
+ tp1.setFrameMax(2000L);
+ tp1.setChannelMax(32767);
+ tp1.setHeartbeat(600);
+ ConnectionTuneParameters tp2 = new ConnectionTuneParameters();
+ tp2.setFrameMax(1000L);
+ tp2.setChannelMax(32767);
+ tp2.setHeartbeat(600);
+ String message = createMessage(2500);
+
+ AMQTopic topic = new AMQTopic("MyTopic");
+ AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test", tp1);
+ AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ BasicMessageProducer producer = session1.createBasicProducer(topic);
+
+ AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test", tp2);
+ Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session2.createConsumer(topic);
+ con2.start();
+
+ producer.send(session1.createTextMessage(message));
+ TextMessage tm1 = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm1);
+ assertEquals(message, tm1.getText());
+
+ con2.close();
+ con1.close();
+ } */
+
+ public void testInterleavedRefs() throws Exception
+ {
+ ConnectionTuneParameters tp = new ConnectionTuneParameters();
+ tp.setFrameMax(1000L);
+ tp.setChannelMax(32767);
+ tp.setHeartbeat(600);
+ String message = createMessage(500);
+
+ AMQTopic topic = new AMQTopic("MyTopic");
+ AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test");
+ AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ BasicMessageProducer producer = session1.createBasicProducer(topic);
+
+ AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test", tp);
+ Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session2.createConsumer(topic);
+ con2.start();
+
+ String refId1 = producer.openRef();
+ String refId2 = producer.openRef();
+ producer.transferRef(refId1, ((JMSTextMessage)session1.createTextMessage()).getMessageHeaders());
+ producer.transferRef(refId2, ((JMSTextMessage)session1.createTextMessage()).getMessageHeaders());
+ producer.appendRef(refId1, message.getBytes());
+ producer.appendRef(refId2, message.getBytes());
+ String refId3 = producer.openRef();
+ producer.appendRef(refId1, message.getBytes());
+ producer.transferRef(refId3, ((JMSTextMessage)session1.createTextMessage()).getMessageHeaders());
+ producer.appendRef(refId3, message.getBytes());
+ producer.appendRef(refId3, message.getBytes());
+ producer.appendRef(refId1, message.getBytes());
+ producer.closeRef(refId1);
+ producer.appendRef(refId3, message.getBytes());
+ producer.appendRef(refId3, message.getBytes());
+ producer.closeRef(refId2);
+ producer.appendRef(refId3, message.getBytes());
+ producer.closeRef(refId3);
+
+ TextMessage tm1 = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm1);
+ assertEquals(message + message + message, tm1.getText());
+ TextMessage tm2 = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm2);
+ assertEquals(message, tm2.getText());
+ TextMessage tm3 = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm3);
+ assertEquals(message + message + message + message + message, tm3.getText());
+
+ con2.close();
+ con1.close();
+ }
+
+ public void testEmptyContentRef() throws Exception
+ {
+ AMQTopic topic = new AMQTopic("MyTopic");
+ AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test");
+ AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ BasicMessageProducer producer = session1.createBasicProducer(topic);
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test");
+ Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session2.createConsumer(topic);
+ con2.start();
+
+ String refId = producer.openRef();
+ producer.transferRef(refId, ((JMSTextMessage)session1.createTextMessage()).getMessageHeaders());
+ producer.closeRef(refId);
+ TextMessage tm1 = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm1);
+ assertEquals("", tm1.getText());
+
+ con2.close();
+ con1.close();
+ }
+
+ // Utility to create message "012345678901234567890..." for length len chars.
+ private String createMessage(int len)
+ {
+ StringBuffer sb = new StringBuffer(len);
+ for (int i=0; i<len; i++)
+ sb.append(i%10);
+ return sb.toString();
}
}