You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/08/08 19:08:37 UTC

svn commit: r684016 - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/protocol/ client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/protocol/ common/src/main/java/org/apache/qpid/fram...

Author: aidan
Date: Fri Aug  8 10:08:37 2008
New Revision: 684016

URL: http://svn.apache.org/viewvc?rev=684016&view=rev
Log:
QPID-1218  Optionally use IoTransport, it's hot, but doesn't pass all the tests yet.

Added:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java
Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=684016&r1=684015&r2=684016&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Fri Aug  8 10:08:37 2008
@@ -50,6 +50,7 @@
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.network.io.IoSender;
 
 import javax.management.JMException;
 import javax.security.sasl.SaslServer;
@@ -845,4 +846,14 @@
     {
         return (_clientVersion == null) ? null : _clientVersion.toString();
     }
+
+    public void setSender(IoSender sender)
+    {
+       // No-op, interface munging between this and AMQProtocolSession
+    }
+
+    public void init()
+    {
+       // No-op, interface munging between this and AMQProtocolSession
+    }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=684016&r1=684015&r2=684016&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Fri Aug  8 10:08:37 2008
@@ -48,6 +48,7 @@
 import org.apache.qpid.framing.TxSelectOkBody;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ChannelLimitReachedException;
+import org.apache.qpid.transport.network.io.IoTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -88,7 +89,16 @@
 
         StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates);
 
-        TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
+        // TODO: use system property thingy for this
+        if (System.getProperty("UseTransportIo", "false").equals("false"))   
+        {
+            TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
+        } 
+        else 
+        {
+            _conn.getProtocolHandler().createIoTransportSession(brokerDetail);
+        }
+        
         // this blocks until the connection has been set up or when an error
         // has prevented the connection being set up
 

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java?rev=684016&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java Fri Aug  8 10:08:37 2008
@@ -0,0 +1,125 @@
+package org.apache.qpid.client.protocol;
+
+import java.util.UUID;
+
+import javax.security.sasl.SaslClient;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.mina.common.IdleStatus;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.transport.network.io.IoSender;
+
+public class AMQIoTransportProtocolSession extends AMQProtocolSession
+{
+
+    protected IoSender _ioSender;
+    private SaslClient _saslClient;
+    private ConnectionTuneParameters _connectionTuneParameters;
+    
+    public AMQIoTransportProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
+    {
+        super(protocolHandler, connection);
+    }
+    
+    @Override
+    public void closeProtocolSession(boolean waitLast) throws AMQException
+    {
+        _ioSender.close();
+        _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
+    }
+
+    @Override
+    public void init()
+    {
+        _ioSender.send(new ProtocolInitiation(_connection.getProtocolVersion()).toNioByteBuffer());
+        _ioSender.flush();
+    }
+
+    @Override
+    protected AMQShortString generateQueueName()
+    {
+        int id;
+        synchronized (_queueIdLock)
+        {
+            id = _queueId++;
+        }
+        return new AMQShortString("tmp_" + UUID.randomUUID() + "_" + id);
+    }
+    
+    @Override
+    public AMQConnection getAMQConnection()
+    {
+        return _connection;
+    }
+    
+    @Override
+    public SaslClient getSaslClient()
+    {
+        return _saslClient;
+    }
+    
+    @Override
+    public void setSaslClient(SaslClient client)
+    {
+        _saslClient = client;
+    }
+    
+    /** @param delay delay in seconds (not ms) */
+    @Override
+    void initHeartbeats(int delay)
+    {
+        if (delay > 0)
+        {
+            // FIXME: actually do something here
+            HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
+        }
+    }
+    
+    @Override
+    public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
+    {
+        // FIXME?
+        _protocolHandler.methodBodyReceived(channel, amqMethodBody, null);
+    }
+    
+    @Override
+    public void writeFrame(AMQDataBlock frame, boolean wait)
+    {      
+        _ioSender.send(frame.toNioByteBuffer());
+        if (wait)
+        {
+            _ioSender.flush();
+        }
+    }
+    
+    @Override
+    public void setSender(IoSender sender)
+    {
+        _ioSender = sender;
+    }
+ 
+    @Override
+    public ConnectionTuneParameters getConnectionTuneParameters()
+    {
+        return _connectionTuneParameters;
+    }
+    
+    @Override
+    public void setConnectionTuneParameters(ConnectionTuneParameters params)
+    {
+        _connectionTuneParameters = params;
+        AMQConnection con = getAMQConnection();
+        con.setMaximumChannelCount(params.getChannelMax());
+        con.setMaximumFrameSize(params.getFrameMax());
+        initHeartbeats((int) params.getHeartbeat());
+    }
+}

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=684016&r1=684015&r2=684016&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Aug  8 10:08:37 2008
@@ -47,11 +47,13 @@
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.framing.*;
+import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.network.io.IoTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -253,6 +255,19 @@
     }
 
     /**
+     * Called when we want to create a new IoTransport session
+     * @param brokerDetail 
+     */
+    public void createIoTransportSession(BrokerDetails brokerDetail)
+    {
+        _protocolSession = new AMQProtocolSession(this, _connection);
+        _stateManager.setProtocolSession(_protocolSession);
+        IoTransport.connect_0_9(getProtocolSession(),
+                brokerDetail.getHost(), brokerDetail.getPort());
+        _protocolSession.init();
+    }
+    
+    /**
      * Called when the network connection is closed. This can happen, either because the client explicitly requested
      * that the connection be closed, in which case nothing is done, or because the connection died. In the case
      * where the connection died, an attempt to failover automatically to a new connection may be started. The failover

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=684016&r1=684015&r2=684016&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Aug  8 10:08:37 2008
@@ -44,6 +44,7 @@
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.network.io.IoSender;
 import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
 
 /**
@@ -99,7 +100,8 @@
 
     private MethodDispatcher _methodDispatcher;
 
-    private final AMQConnection _connection;
+    protected final AMQConnection _connection;
+
     private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
 
     public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
@@ -118,11 +120,20 @@
 
     }
 
+    public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
+    {
+        _protocolHandler = protocolHandler;
+        _minaProtocolSession = null;
+        _protocolVersion = connection.getProtocolVersion();
+        _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
+                                                                           this);
+        _connection = connection;
+    }
+
     public void init()
     {
         // start the process of setting up the connection. This is the first place that
         // data is written to the server.
-
         _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
     }
 
@@ -171,7 +182,7 @@
 
     public SaslClient getSaslClient()
     {
-        return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);
+        return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);    
     }
 
     /**
@@ -422,6 +433,7 @@
         }
 
         _logger.debug("Closing protocol session");
+        
         final CloseFuture future = _minaProtocolSession.close();
 
         // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED
@@ -430,7 +442,6 @@
         // error now shouldn't matter.
 
         _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
-
         future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
     }
 
@@ -535,4 +546,9 @@
     {
         _protocolHandler.propagateExceptionToAllWaiters(error);
     }
+
+    public void setSender(IoSender sender)
+    {
+        // No-op, interface munging
+    }
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java?rev=684016&r1=684015&r2=684016&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java Fri Aug  8 10:08:37 2008
@@ -50,4 +50,14 @@
         return buffer;
     }
 
+    public java.nio.ByteBuffer toNioByteBuffer()
+    {
+        final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize());
+
+        ByteBuffer buf = ByteBuffer.wrap(buffer);
+        writePayload(buf);    
+        buffer.flip();
+        return buffer;
+    }
+
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java?rev=684016&r1=684015&r2=684016&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java Fri Aug  8 10:08:37 2008
@@ -21,6 +21,7 @@
 package org.apache.qpid.protocol;
 
 import org.apache.qpid.framing.*;
+import org.apache.qpid.transport.network.io.IoSender;
 import org.apache.qpid.AMQException;
 
 /**
@@ -54,4 +55,7 @@
     public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException;
 
 
+    public void setSender(IoSender sender);
+    public void init();
+
 }

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java?rev=684016&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java Fri Aug  8 10:08:37 2008
@@ -0,0 +1,109 @@
+package org.apache.qpid.transport.network.io;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyFactory;
+import org.apache.qpid.framing.BodyFactory;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentBodyFactory;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentHeaderBodyFactory;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.HeartbeatBodyFactory;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.Receiver;
+
+public class InputHandler_0_9 implements Receiver<ByteBuffer>
+{
+
+    private AMQVersionAwareProtocolSession _session;
+    private MethodRegistry _registry;
+    private BodyFactory bodyFactory;
+    private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
+
+    static
+    {
+        _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance();
+        _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance();
+        _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory();
+    }
+    
+    public InputHandler_0_9(AMQVersionAwareProtocolSession session)
+    {
+        _session = session;
+        _registry = _session.getMethodRegistry();
+    }
+
+    public void closed()
+    {
+        // AS FIXME: implement
+    }
+
+    public void exception(Throwable t)
+    {
+        // TODO: propogate exception to things
+        t.printStackTrace();
+    }
+
+    public void received(ByteBuffer buf)
+    {
+        org.apache.mina.common.ByteBuffer in = org.apache.mina.common.ByteBuffer.wrap(buf);
+        try
+        {
+            final byte type = in.get();
+            if (type == AMQMethodBody.TYPE)
+            {
+                bodyFactory = new AMQMethodBodyFactory(_session);
+            }
+            else
+            {
+                bodyFactory = _bodiesSupported[type];
+            }
+
+            if (bodyFactory == null)
+            {
+                throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
+            }
+
+            final int channel = in.getUnsignedShort();
+            final long bodySize = in.getUnsignedInt();
+
+            // bodySize can be zero
+            if ((channel < 0) || (bodySize < 0))
+            {
+                throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel
+                    + " bodySize = " + bodySize, null);
+            }
+
+            AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
+
+            byte marker = in.get();
+            if ((marker & 0xFF) != 0xCE)
+            {
+                throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
+                    + " type=" + type, null);
+            }
+
+            try
+            {
+                frame.getBodyFrame().handle(frame.getChannel(), _session);
+            }
+            catch (AMQException e)
+            {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        }
+        catch (AMQFrameDecodingException e)
+        {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+
+}

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=684016&r1=684015&r2=684016&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Fri Aug  8 10:08:37 2008
@@ -31,7 +31,7 @@
 import static org.apache.qpid.transport.util.Functions.*;
 
 
-final class IoSender extends Thread implements Sender<ByteBuffer>
+public final class IoSender extends Thread implements Sender<ByteBuffer>
 {
 
     private static final Logger log = Logger.get(IoSender.class);

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java?rev=684016&r1=684015&r2=684016&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java Fri Aug  8 10:08:37 2008
@@ -26,6 +26,7 @@
 import java.net.SocketException;
 import java.nio.ByteBuffer;
 
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.ConnectionDelegate;
 import org.apache.qpid.transport.Receiver;
@@ -82,6 +83,19 @@
     private Connection connectInternal(String host, int port,
             ConnectionDelegate delegate)
     {
+        createSocket(host, port);
+
+        sender = new IoSender(this, 2*writeBufferSize, timeout);
+        Connection conn = new Connection
+            (new Disassembler(sender, 64*1024 - 1), delegate);
+        receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)),
+                                  2*readBufferSize, timeout);
+
+        return conn;
+    }
+
+    private void createSocket(String host, int port)
+    {
         try
         {
             InetAddress address = InetAddress.getByName(host);
@@ -108,14 +122,6 @@
         {
             throw new TransportException("Error connecting to broker", e);
         }
-
-        sender = new IoSender(this, 2*writeBufferSize, timeout);
-        Connection conn = new Connection
-            (new Disassembler(sender, 64*1024 - 1), delegate);
-        receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)),
-                                  2*readBufferSize, timeout);
-
-        return conn;
     }
 
     IoSender getSender()
@@ -133,4 +139,21 @@
         return socket;
     }
 
+    public static void connect_0_9 (AMQVersionAwareProtocolSession session, String host, int port)
+    {
+        IoTransport handler = new IoTransport();
+        handler.connectInternal_0_9(session, host, port);
+    }
+    
+    public void connectInternal_0_9(AMQVersionAwareProtocolSession session, String host, int port)
+    {
+
+        createSocket(host, port);
+
+        sender = new IoSender(this, 2*writeBufferSize, timeout);
+        receiver = new IoReceiver(this, new InputHandler_0_9(session),
+                    2*readBufferSize, timeout);
+        session.setSender(sender);
+    }
+
 }

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java?rev=684016&r1=684015&r2=684016&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java Fri Aug  8 10:08:37 2008
@@ -29,6 +29,7 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.transport.network.io.IoSender;
 
 import javax.security.sasl.SaslServer;
 import java.util.HashMap;
@@ -246,4 +247,16 @@
     {
         return null;  //To change body of implemented methods use File | Settings | File Templates.
     }
+
+    public void setSender(IoSender sender)
+    {
+        // FIXME AS TODO
+        
+    }
+
+    public void init()
+    {
+        // TODO Auto-generated method stub
+        
+    }
 }