You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/02/20 21:22:15 UTC

svn commit: r509738 - in /incubator/qpid/branches/qpid.0-9/java: broker/src/main/java/org/apache/qpid/server/ client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/handler/ client/src/main/java/org/apache/qpid/client/...

Author: kpvdr
Date: Tue Feb 20 12:22:14 2007
New Revision: 509738

URL: http://svn.apache.org/viewvc?view=rev&rev=509738
Log:
Fixed the various Ref modes so that the new MessageRefTest passes all tests.

Added:
    incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/MessageRefTest.java
      - copied, changed from r508424, incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionRefTest.java
Removed:
    incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionRefTest.java
Modified:
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Feb 20 12:22:14 2007
@@ -140,11 +140,11 @@
     private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory();
 
     private Set<Long> _browsedAcks = new HashSet<Long>();
-
+    
     /**
      * Used in creating unique references. 
      */
-    private byte _refCounter;
+    private static AtomicLong _refIdCounter = new AtomicLong();
 
     // XXX: clean up arguments
     public AMQChannel(int channelId, AMQProtocolSession session, MessageStore messageStore, MessageRouter exchanges, AMQMethodListener methodListener)
@@ -290,32 +290,44 @@
 
     public void addMessageOpen(MessageOpenBody open) throws AMQException
     {
-        try {
+        try
+        {
             createReference(open.reference);
-        } catch (IllegalArgumentException e) {
+        }
+        catch (IllegalArgumentException e)
+        {
             throw open.getConnectionException(503, "Reference is already open");
         }
     }
 
     public void addMessageAppend(MessageAppendBody append) throws AMQException
     {
-        try {
+        try
+        {
             AMQReference ref = getReference(append.reference);
-            ref.appendContent(ByteBuffer.wrap(append.bytes));
-        } catch (IllegalArgumentException e) {
+            if (append.bytes != null) // sending an empty string results in a null
+            {
+                ref.appendContent(ByteBuffer.wrap(append.bytes));
+            }
+        }
+        catch (IllegalArgumentException e)
+        {
             throw append.getConnectionException(503, "Reference is not open");
         }
     }
 
     public void addMessageClose(MessageCloseBody close) throws AMQException
     {
-        try {
+        try
+        {
             AMQReference ref = removeReference(close.reference);
             for (AMQMessage msg : ref.getMessageList())
             {
                 routeCurrentMessage(msg);
             }
-        } catch (IllegalArgumentException e) {
+        }
+        catch (IllegalArgumentException e)
+        {
             throw close.getConnectionException(503, "Reference is not open");
         }
     }
@@ -392,8 +404,11 @@
         _session.writeRequest(_channelId, mtb, listener);
     }
 
-    private synchronized byte[] nextRefId() {
-        return new byte[]{_refCounter++};
+    private synchronized byte[] nextRefId()
+    {
+        // clumsy
+        return String.valueOf(_refIdCounter.incrementAndGet()).getBytes();
+        //return new byte[]{_refIdCounter.getAndIncrement()};
     }
     
     public void deliverRef(AMQMessage msg, AMQShortString destination, AMQMethodListener listener)
@@ -471,7 +486,7 @@
         {
             throw new ConsumerTagNotUniqueException();
         }
-
+        acks = acks;
         queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive);
         _consumerTag2QueueMap.put(tag, queue);
         return tag;

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Feb 20 12:22:14 2007
@@ -177,10 +177,15 @@
     public AMQConnection(String broker, String username, String password,
                          String clientName, String virtualHost) throws AMQException, URLSyntaxException
     {
+        this(broker, username, password, clientName, virtualHost, null);
+    }
+    public AMQConnection(String broker, String username, String password,
+                         String clientName, String virtualHost, ConnectionTuneParameters params) throws AMQException, URLSyntaxException
+    {
         this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
                                   username + ":" + password + "@" +
                                   (clientName == null ? "" : clientName) + "/" +
-                                  virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"));
+                                  virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), params);
     }
 
     public AMQConnection(String host, int port, String username, String password,
@@ -192,6 +197,12 @@
     public AMQConnection(String host, int port, boolean useSSL, String username, String password,
                          String clientName, String virtualHost) throws AMQException, URLSyntaxException
     {
+        this(host, port, useSSL, username, password, clientName, virtualHost, null);
+    }
+    
+    public AMQConnection(String host, int port, boolean useSSL, String username, String password,
+                         String clientName, String virtualHost, ConnectionTuneParameters params) throws AMQException, URLSyntaxException
+    {
         this(new AMQConnectionURL(useSSL ?
                                   ConnectionURL.AMQ_PROTOCOL + "://" +
                                   username + ":" + password + "@" +
@@ -203,16 +214,25 @@
                                                                                 (clientName == null ? "" : clientName) +
                                                                                 virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
                                                                                 + "," + ConnectionURL.OPTIONS_SSL + "='false'"
-        ));
+        ), params);
     }
 
     public AMQConnection(String connection) throws AMQException, URLSyntaxException
     {
-        this(new AMQConnectionURL(connection));
+        this(new AMQConnectionURL(connection), null);
+    }
+
+    public AMQConnection(String connection, ConnectionTuneParameters params) throws AMQException, URLSyntaxException
+    {
+        this(new AMQConnectionURL(connection), params);
     }
 
     public AMQConnection(ConnectionURL connectionURL) throws AMQException
     {
+        this(connectionURL, null);
+    }
+    public AMQConnection(ConnectionURL connectionURL, ConnectionTuneParameters params) throws AMQException
+    {
         _logger.info("Connection:" + connectionURL);
         _ConnectionId.incrementAndGet();
         if (connectionURL == null)
@@ -229,7 +249,7 @@
 
         _failoverPolicy = new FailoverPolicy(connectionURL);
 
-        _protocolHandler = new AMQProtocolHandler(this);
+        _protocolHandler = new AMQProtocolHandler(this, params);
 
         // We are not currently connected
         _connected = false;

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Feb 20 12:22:14 2007
@@ -1645,10 +1645,13 @@
     {
         if (_startedAtLeastOnce.getAndSet(true))
         {
-        	try{
+        	try
+            {
             	//then we stopped this and are restarting, so signal server to resume delivery
         		unsuspendChannel();
-	        }catch(AMQException e){
+	        }
+            catch(AMQException e)
+            {
 	        	_logger.error("Error Un Suspending Channel", e);
 	        }
         }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Tue Feb 20 12:22:14 2007
@@ -36,6 +36,7 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
 {
@@ -106,6 +107,8 @@
 
     private final boolean _waitUntilSent;
     private static final Content[] NO_CONTENT = new Content[0];
+    
+    private static AtomicLong _refIdCounter;
 
     protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted,
                                    int channelId, AMQSession session, AMQProtocolHandler protocolHandler,
@@ -126,6 +129,7 @@
         _immediate = immediate;
         _mandatory = mandatory;
         _waitUntilSent = waitUntilSent;
+        _refIdCounter = new AtomicLong();
     }
 
     void resubscribe() throws AMQException
@@ -256,19 +260,6 @@
         }
     }
 
-    public void sendRef(Message message) throws JMSException
-    {
-        checkPreConditions();
-        checkInitialDestination();
-
-
-        synchronized (_connection.getFailoverMutex())
-        {
-            sendImpl(_destination, message, true, _deliveryMode, _messagePriority, _timeToLive,
-                     _mandatory, _immediate);
-        }
-    }
-
     public void send(Message message, int deliveryMode) throws JMSException
     {
         checkPreConditions();
@@ -373,6 +364,44 @@
         }
     }
 
+    // Send entire message as a ref
+    public void sendAsRef(Message message) throws JMSException
+    {
+        checkPreConditions();
+        checkInitialDestination();
+
+
+        synchronized (_connection.getFailoverMutex())
+        {
+            sendImpl(_destination, message, true, _deliveryMode, _messagePriority, _timeToLive,
+                     _mandatory, _immediate);
+        }
+    }
+    
+    // Test methods for sending a ref
+    public String openRef() throws JMSException
+    {
+        String referenceId = generateReferenceId();
+        doMessageOpen(referenceId);
+        return referenceId;
+    }
+    
+    public void transferRef(String referenceId, MessageHeaders messageHeaders) throws JMSException
+    {
+        Content content = new Content(Content.TypeEnum.REF_T, referenceId.getBytes()); 
+        doMessageTransfer(messageHeaders, _destination, content, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate, false);
+    }
+    
+    public void appendRef(String referenceId, byte[] content) throws JMSException
+    {
+        doMessageAppend(referenceId, content);
+    }
+    
+    public void closeRef(String referenceId) throws JMSException
+    {
+        doMessageClose(referenceId);
+    }
+
 
     private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
     {
@@ -526,7 +555,7 @@
 
         	Content data = new Content(Content.TypeEnum.INLINE_T, payload);
 
-        	doMessageTransfer(messageHeaders,destination,data,message,deliveryMode,priority,timeToLive,mandatory,immediate);
+        	doMessageTransfer(messageHeaders, destination, data, deliveryMode, priority, timeToLive, mandatory, immediate, message.getJMSRedelivered());
         }
         else
         {
@@ -547,8 +576,8 @@
         	doMessageOpen(referenceId);
         	
         	// Message.Transfer
-        	Content data = new Content(Content.TypeEnum.REF_T, referenceId.getBytes()); 
-        	doMessageTransfer(messageHeaders,destination,data,message,deliveryMode,priority,timeToLive,mandatory,immediate);
+        	Content data = new Content(Content.TypeEnum.REF_T, referenceId.getBytes());
+        	doMessageTransfer(messageHeaders, destination, data, deliveryMode, priority, timeToLive, mandatory, immediate, message.getJMSRedelivered());
         	
         	//Message.Append
         	for(Iterator it = content.iterator(); it.hasNext();)
@@ -572,8 +601,8 @@
     }
     
     private void doMessageTransfer(MessageHeaders messageHeaders, AMQDestination destination, Content content,
-            AbstractJMSMessage message, int deliveryMode, int priority,
-            long timeToLive, boolean mandatory, boolean immediate) throws JMSException
+            int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate,
+            boolean redelivered) throws JMSException
     {
     	try
         {
@@ -583,7 +612,7 @@
                 _protocolHandler.getProtocolMinorVersion(), // AMQP minor version
                 messageHeaders.getAppId(),      // String appId
                 messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
-                content,                     // Content body
+                content,                        // Content body
                 messageHeaders.getEncoding(),   // String contentEncoding
                 messageHeaders.getContentType(), // String contentType
                 messageHeaders.getCorrelationId(), // String correlationId
@@ -595,7 +624,7 @@
                 mandatory,                      // boolean mandatory
                 messageHeaders.getMessageId(),  // String messageId
                 (short)priority,                // short priority
-                message.getJMSRedelivered(),    // boolean redelivered
+                redelivered,                    // boolean redelivered
                 messageHeaders.getReplyTo(),    // String replyTo
                 destination.getRoutingKey(),    // String routingKey
                 new String("abc123").getBytes(), // byte[] securityToken
@@ -665,8 +694,9 @@
         }
     }
     
-    private String generateReferenceId(){
-    	return String.valueOf(System.currentTimeMillis());
+    private String generateReferenceId()
+    {
+        return String.valueOf(_refIdCounter.incrementAndGet());
     }
 
     private void checkTemporaryDestination(AMQDestination destination) throws JMSException

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java Tue Feb 20 12:22:14 2007
@@ -59,9 +59,20 @@
             params = new ConnectionTuneParameters();
         }
 
-        params.setFrameMax(frame.frameMax);        
-        params.setChannelMax(frame.channelMax);
-        params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat));
+        // Set frame and channel max to smaller of client or broker size (if client size is set)
+        if (frame.getFrameMax() < params.getFrameMax() || params.getFrameMax() == 0)
+        {
+            params.setFrameMax(frame.getFrameMax());
+        }
+        if (frame.getChannelMax() < params.getChannelMax() || params.getChannelMax() == 0)
+        {
+            params.setChannelMax(frame.getChannelMax());
+        }
+        // Set heartbeat delay to lowest value
+        if (Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat) < params.getHeartbeat() || params.getHeartbeat() == 0)
+        {
+            params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat));
+        }
         protocolSession.setConnectionTuneParameters(params);
 
         stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java Tue Feb 20 12:22:14 2007
@@ -50,11 +50,13 @@
         {
 			protocolSession.messageAppendBodyReceived((MessageAppendBody)evt.getMethod());
 
+// TODO: Fix this - the MethodOks are never being sent, find a way to send them when the JMS
+// Acknowledgement mode is appropriate.
             // Be aware of possible changes to parameter order as versions change.
-            final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
-                protocolSession.getProtocolMajorVersion(), // AMQP major version
-                protocolSession.getProtocolMinorVersion()); // AMQP minor version
-            protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
+//             final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
+//                 protocolSession.getProtocolMajorVersion(), // AMQP major version
+//                 protocolSession.getProtocolMinorVersion()); // AMQP minor version
+//             protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
 		}
         catch (Exception e)
         {

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java Tue Feb 20 12:22:14 2007
@@ -52,11 +52,13 @@
 		protocolSession.deliverMessageToAMQSession(evt.getChannelId(), referenceId);
 		_logger.debug("Method Close Body received, notify session to accept unprocessed message");
 
+// TODO: Fix this - the MethodOks are never being sent, find a way to send them when the JMS
+// Acknowledgement mode is appropriate.
         // Be aware of possible changes to parameter order as versions change.
-        final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
-            protocolSession.getProtocolMajorVersion(), // AMQP major version
-            protocolSession.getProtocolMinorVersion()); // AMQP minor version
-        protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
+//         final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
+//             protocolSession.getProtocolMajorVersion(), // AMQP major version
+//             protocolSession.getProtocolMinorVersion()); // AMQP minor version
+//         protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
     }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java Tue Feb 20 12:22:14 2007
@@ -48,14 +48,16 @@
     public void methodReceived (AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
     {
     	byte[] referenceId = ((MessageOpenBody)evt.getMethod()).getReference();
-    	final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), referenceId);
+    	final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), referenceId);
     	protocolSession.unprocessedMessageReceived(new String(referenceId), msg);
 
+// TODO: Fix this - the MethodOks are never being sent, find a way to send them when the JMS
+// Acknowledgement mode is appropriate.
         // Be aware of possible changes to parameter order as versions change.
-        final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
-            protocolSession.getProtocolMajorVersion(), // AMQP major version
-            protocolSession.getProtocolMinorVersion()); // AMQP minor version
-        protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
+//         final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
+//             protocolSession.getProtocolMajorVersion(), // AMQP major version
+//             protocolSession.getProtocolMinorVersion()); // AMQP minor version
+//         protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
     }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java Tue Feb 20 12:22:14 2007
@@ -78,7 +78,7 @@
         else
         {
         	String referenceId = new String(transferBody.getBody().getContentAsByteArray());
-        	protocolSession.messageTransferBodyReceivedForReferenceCase(referenceId, messageHeaders,transferBody.getRedelivered());
+        	protocolSession.messageTransferBodyReceivedForReferenceCase(referenceId, evt.getRequestId(), messageHeaders, transferBody.getRedelivered());
         }
         
     }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Tue Feb 20 12:22:14 2007
@@ -41,10 +41,9 @@
     private boolean redeliveredFlag;
 	private MessageHeaders messageHeaders;
     
-    public UnprocessedMessage(int channelId, long deliveryTag, byte[] referenceId)
+    public UnprocessedMessage(int channelId, byte[] referenceId)
     {
         this.channelId = channelId;
-        this.deliveryTag = deliveryTag;
         this.referenceId = referenceId;
     }
     
@@ -113,11 +112,18 @@
             new String(contents.get(0));
     }
 
-	public void setMessageHeaders(MessageHeaders messageHeaders) {
+    public void setDeliveryTag(long deliveryTag)
+    {
+        this.deliveryTag = deliveryTag;
+    }
+
+	public void setMessageHeaders(MessageHeaders messageHeaders)
+    {
 		this.messageHeaders = messageHeaders;
 	}
 
-	public void setRedeliveredFlag(boolean redeliveredFlag) {
+	public void setRedeliveredFlag(boolean redeliveredFlag)
+    {
 		this.redeliveredFlag = redeliveredFlag;
 	}
 }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Tue Feb 20 12:22:14 2007
@@ -35,6 +35,7 @@
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.ConnectionTuneParameters;
 import org.apache.qpid.client.failover.FailoverHandler;
 import org.apache.qpid.client.failover.FailoverState;
 import org.apache.qpid.client.state.AMQState;
@@ -70,6 +71,7 @@
      * mapping between connection instances and protocol handler instances.
      */
     private AMQConnection _connection;
+    private ConnectionTuneParameters _params;
 
     /**
      * Used only when determining whether to add the SSL filter or not. This should be made more
@@ -104,9 +106,10 @@
 
     private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
 
-    public AMQProtocolHandler(AMQConnection con)
+    public AMQProtocolHandler(AMQConnection con, ConnectionTuneParameters params)
     {
         _connection = con;
+        _params = params;
     }
 
     public boolean isUseSSL()
@@ -156,6 +159,8 @@
         }
   
         _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
+        if (_params != null)
+            _protocolSession.setConnectionTuneParameters(_params);
         _protocolSession.init();
     }
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=509738&r1=509737&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Tue Feb 20 12:22:14 2007
@@ -282,8 +282,10 @@
     	msg.addContent(appendBody.bytes);
     }
     
-    public void messageTransferBodyReceivedForReferenceCase(String referenceId,MessageHeaders messageHeaders,boolean redilivered){
+    public void messageTransferBodyReceivedForReferenceCase(String referenceId, long deliveryTag, MessageHeaders messageHeaders, boolean redilivered)
+    {
     	UnprocessedMessage msg = (UnprocessedMessage)_referenceId2UnprocessedMsgMap.get(referenceId);
+        msg.setDeliveryTag(deliveryTag);
     	msg.setMessageHeaders(messageHeaders);
     	msg.setRedeliveredFlag(redilivered);    	
     }

Copied: incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/MessageRefTest.java (from r508424, incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionRefTest.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/MessageRefTest.java?view=diff&rev=509738&p1=incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionRefTest.java&r1=508424&p2=incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/MessageRefTest.java&r2=509738
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionRefTest.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/test/java/org/apache/qpid/test/unit/basic/MessageRefTest.java Tue Feb 20 12:22:14 2007
@@ -26,13 +26,15 @@
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQTopic;
 import org.apache.qpid.client.BasicMessageProducer;
+import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.message.JMSTextMessage;
 
 import javax.jms.*;
 
 /**
  * @author Apache Software Foundation
  */
-public class PubSubTwoConnectionRefTest extends TestCase
+public class MessageRefTest extends TestCase
 {
     protected void setUp() throws Exception
     {
@@ -45,37 +47,226 @@
         super.tearDown();
     }
 
-    /**
-     * This tests that a consumer is set up synchronously
-     * @throws Exception
-     */
-    public void testTwoConnections() throws Exception
+    public void testOneWayRef() throws Exception
     {
         AMQTopic topic = new AMQTopic("MyTopic");
         AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test");
-        AMQSession session1 = con1.createAMQSession(false, AMQSession.NO_ACKNOWLEDGE);
+        AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE);
         BasicMessageProducer producer = session1.createBasicProducer(topic);
 
         Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test");
-        Session session2 = con2.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+        Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer = session2.createConsumer(topic);
         con2.start();        
-        producer.sendRef(session1.createTextMessage("Hello ref"));
-//        producer.sendRef(session1.createTextMessage("Goodbye ref"));
+        
+        producer.sendAsRef(session1.createTextMessage("Hello ref"));
         TextMessage tm1 = (TextMessage) consumer.receive(2000);
         assertNotNull(tm1);
         assertEquals("Hello ref", tm1.getText());
-//        assertEquals("Goodbye ref", tm1.getText());
+        
+        con2.close();
+        con1.close();
     }
     
-    public static void main(String[] args){
-    	PubSubTwoConnectionRefTest test = new PubSubTwoConnectionRefTest();
-    	try {
-			test.setUp();
-			test.testTwoConnections();
-		} catch (Exception e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		}
+    public void testOneWayRefAppend() throws Exception
+    {
+        AMQTopic topic = new AMQTopic("MyTopic");
+        AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test");
+        AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+        BasicMessageProducer producer = session1.createBasicProducer(topic);
+
+        Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test");
+        Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session2.createConsumer(topic);
+        con2.start();        
+        
+        String refId = producer.openRef();
+        producer.transferRef(refId, ((JMSTextMessage)session1.createTextMessage()).getMessageHeaders());
+        producer.appendRef(refId, new String("ABC").getBytes());
+        producer.appendRef(refId, new String("123").getBytes());
+        producer.appendRef(refId, new String("").getBytes());
+        producer.appendRef(refId, new String("DEF").getBytes());
+        producer.appendRef(refId, new String("456").getBytes());
+        producer.closeRef(refId);
+        TextMessage tm1 = (TextMessage) consumer.receive(2000);
+        assertNotNull(tm1);
+        assertEquals("ABC123DEF456", tm1.getText());
+        
+        con2.close();
+        con1.close();
+    }
+
+    public void testTwoWayRef()  throws Exception
+    {
+        // Set frame size to 1000 and send message of 2500
+        ConnectionTuneParameters tp = new ConnectionTuneParameters();
+        tp.setFrameMax(1000L);
+        tp.setChannelMax(32767);
+        tp.setHeartbeat(600);
+        String message = createMessage(2500);
+        
+        AMQTopic topic = new AMQTopic("MyTopic");
+        AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test", tp);
+        AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+        BasicMessageProducer producer = session1.createBasicProducer(topic);
+
+        AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test", tp);
+        Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session2.createConsumer(topic);
+        con2.start();
+               
+        producer.send(session1.createTextMessage(message));
+        TextMessage tm1 = (TextMessage) consumer.receive(2000);
+        assertNotNull(tm1);
+        assertEquals(message, tm1.getText());
+        
+        con2.close();
+        con1.close();
+    }
+
+    public void testUpSmallDownBig() throws Exception
+    {
+        ConnectionTuneParameters tp1 = new ConnectionTuneParameters();
+        tp1.setFrameMax(1000L);
+        tp1.setChannelMax(32767);
+        tp1.setHeartbeat(600);
+        ConnectionTuneParameters tp2 = new ConnectionTuneParameters();
+        tp2.setFrameMax(2000L);
+        tp2.setChannelMax(32767);
+        tp2.setHeartbeat(600);
+        String message = createMessage(2500);
+        
+        AMQTopic topic = new AMQTopic("MyTopic");
+        AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test", tp1);
+        AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+        BasicMessageProducer producer = session1.createBasicProducer(topic);
+
+        AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test", tp2);
+        Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session2.createConsumer(topic);
+        con2.start();
+               
+        producer.send(session1.createTextMessage(message));
+        TextMessage tm1 = (TextMessage) consumer.receive(2000);
+        assertNotNull(tm1);
+        assertEquals(message, tm1.getText());
+        
+        con2.close();
+        con1.close();
+    }
+
+    //*** Uncomment this test when the rechunking code has been included in AMQChannel.deliver() ***
+    /* public void testUpBigDownSmall() throws Exception
+    {
+        ConnectionTuneParameters tp1 = new ConnectionTuneParameters();
+        tp1.setFrameMax(2000L);
+        tp1.setChannelMax(32767);
+        tp1.setHeartbeat(600);
+        ConnectionTuneParameters tp2 = new ConnectionTuneParameters();
+        tp2.setFrameMax(1000L);
+        tp2.setChannelMax(32767);
+        tp2.setHeartbeat(600);
+        String message = createMessage(2500);
+        
+        AMQTopic topic = new AMQTopic("MyTopic");
+        AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test", tp1);
+        AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+        BasicMessageProducer producer = session1.createBasicProducer(topic);
+
+        AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test", tp2);
+        Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session2.createConsumer(topic);
+        con2.start();
+               
+        producer.send(session1.createTextMessage(message));
+        TextMessage tm1 = (TextMessage) consumer.receive(2000);
+        assertNotNull(tm1);
+        assertEquals(message, tm1.getText());
+        
+        con2.close();
+        con1.close();
+    } */
+    
+    public void testInterleavedRefs() throws Exception
+    {        
+        ConnectionTuneParameters tp = new ConnectionTuneParameters();
+        tp.setFrameMax(1000L);
+        tp.setChannelMax(32767);
+        tp.setHeartbeat(600);
+        String message = createMessage(500);
+        
+        AMQTopic topic = new AMQTopic("MyTopic");
+        AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test");
+        AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+        BasicMessageProducer producer = session1.createBasicProducer(topic);
+
+        AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test", tp);
+        Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session2.createConsumer(topic);
+        con2.start();
+               
+        String refId1 = producer.openRef();
+        String refId2 = producer.openRef();
+        producer.transferRef(refId1, ((JMSTextMessage)session1.createTextMessage()).getMessageHeaders());
+        producer.transferRef(refId2, ((JMSTextMessage)session1.createTextMessage()).getMessageHeaders());
+        producer.appendRef(refId1, message.getBytes());
+        producer.appendRef(refId2, message.getBytes());
+        String refId3 = producer.openRef();
+        producer.appendRef(refId1, message.getBytes());
+        producer.transferRef(refId3, ((JMSTextMessage)session1.createTextMessage()).getMessageHeaders());
+        producer.appendRef(refId3, message.getBytes());
+        producer.appendRef(refId3, message.getBytes());
+        producer.appendRef(refId1, message.getBytes());
+        producer.closeRef(refId1);
+        producer.appendRef(refId3, message.getBytes());
+        producer.appendRef(refId3, message.getBytes());
+        producer.closeRef(refId2);
+        producer.appendRef(refId3, message.getBytes());
+        producer.closeRef(refId3);
+        
+        TextMessage tm1 = (TextMessage) consumer.receive(2000);
+        assertNotNull(tm1);
+        assertEquals(message + message + message, tm1.getText());
+        TextMessage tm2 = (TextMessage) consumer.receive(2000);
+        assertNotNull(tm2);
+        assertEquals(message, tm2.getText());
+        TextMessage tm3 = (TextMessage) consumer.receive(2000);
+        assertNotNull(tm3);
+        assertEquals(message + message + message + message + message, tm3.getText());
+        
+        con2.close();
+        con1.close();
+    }
+    
+    public void testEmptyContentRef() throws Exception
+    {
+        AMQTopic topic = new AMQTopic("MyTopic");
+        AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test");
+        AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+        BasicMessageProducer producer = session1.createBasicProducer(topic);
+
+        Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test");
+        Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session2.createConsumer(topic);
+        con2.start();        
+        
+        String refId = producer.openRef();
+        producer.transferRef(refId, ((JMSTextMessage)session1.createTextMessage()).getMessageHeaders());
+        producer.closeRef(refId);
+        TextMessage tm1 = (TextMessage) consumer.receive(2000);
+        assertNotNull(tm1);
+        assertEquals("", tm1.getText());
+        
+        con2.close();
+        con1.close();
+    }
+    
+    // Utility to create message "012345678901234567890..." for length len chars.
+    private String createMessage(int len)
+    {
+        StringBuffer sb = new StringBuffer(len);
+        for (int i=0; i<len; i++)
+            sb.append(i%10);
+        return sb.toString();
     }
 }