You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/08/08 19:08:37 UTC
svn commit: r684016 - in /incubator/qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/protocol/
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client/protocol/
common/src/main/java/org/apache/qpid/fram...
Author: aidan
Date: Fri Aug 8 10:08:37 2008
New Revision: 684016
URL: http://svn.apache.org/viewvc?rev=684016&view=rev
Log:
QPID-1218 Optionally use IoTransport, it's hot, but doesn't pass all the tests yet.
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=684016&r1=684015&r2=684016&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Fri Aug 8 10:08:37 2008
@@ -50,6 +50,7 @@
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.network.io.IoSender;
import javax.management.JMException;
import javax.security.sasl.SaslServer;
@@ -845,4 +846,14 @@
{
return (_clientVersion == null) ? null : _clientVersion.toString();
}
+
+ public void setSender(IoSender sender)
+ {
+ // No-op, interface munging between this and AMQProtocolSession
+ }
+
+ public void init()
+ {
+ // No-op, interface munging between this and AMQProtocolSession
+ }
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=684016&r1=684015&r2=684016&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Fri Aug 8 10:08:37 2008
@@ -48,6 +48,7 @@
import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
+import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,7 +89,16 @@
StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates);
- TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
+ // TODO: use system property thingy for this
+ if (System.getProperty("UseTransportIo", "false").equals("false"))
+ {
+ TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
+ }
+ else
+ {
+ _conn.getProtocolHandler().createIoTransportSession(brokerDetail);
+ }
+
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java?rev=684016&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java Fri Aug 8 10:08:37 2008
@@ -0,0 +1,125 @@
+package org.apache.qpid.client.protocol;
+
+import java.util.UUID;
+
+import javax.security.sasl.SaslClient;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.mina.common.IdleStatus;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.transport.network.io.IoSender;
+
+public class AMQIoTransportProtocolSession extends AMQProtocolSession
+{
+
+ protected IoSender _ioSender;
+ private SaslClient _saslClient;
+ private ConnectionTuneParameters _connectionTuneParameters;
+
+ public AMQIoTransportProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
+ {
+ super(protocolHandler, connection);
+ }
+
+ @Override
+ public void closeProtocolSession(boolean waitLast) throws AMQException
+ {
+ _ioSender.close();
+ _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
+ }
+
+ @Override
+ public void init()
+ {
+ _ioSender.send(new ProtocolInitiation(_connection.getProtocolVersion()).toNioByteBuffer());
+ _ioSender.flush();
+ }
+
+ @Override
+ protected AMQShortString generateQueueName()
+ {
+ int id;
+ synchronized (_queueIdLock)
+ {
+ id = _queueId++;
+ }
+ return new AMQShortString("tmp_" + UUID.randomUUID() + "_" + id);
+ }
+
+ @Override
+ public AMQConnection getAMQConnection()
+ {
+ return _connection;
+ }
+
+ @Override
+ public SaslClient getSaslClient()
+ {
+ return _saslClient;
+ }
+
+ @Override
+ public void setSaslClient(SaslClient client)
+ {
+ _saslClient = client;
+ }
+
+ /** @param delay delay in seconds (not ms) */
+ @Override
+ void initHeartbeats(int delay)
+ {
+ if (delay > 0)
+ {
+ // FIXME: actually do something here
+ HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
+ }
+ }
+
+ @Override
+ public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
+ {
+ // FIXME?
+ _protocolHandler.methodBodyReceived(channel, amqMethodBody, null);
+ }
+
+ @Override
+ public void writeFrame(AMQDataBlock frame, boolean wait)
+ {
+ _ioSender.send(frame.toNioByteBuffer());
+ if (wait)
+ {
+ _ioSender.flush();
+ }
+ }
+
+ @Override
+ public void setSender(IoSender sender)
+ {
+ _ioSender = sender;
+ }
+
+ @Override
+ public ConnectionTuneParameters getConnectionTuneParameters()
+ {
+ return _connectionTuneParameters;
+ }
+
+ @Override
+ public void setConnectionTuneParameters(ConnectionTuneParameters params)
+ {
+ _connectionTuneParameters = params;
+ AMQConnection con = getAMQConnection();
+ con.setMaximumChannelCount(params.getChannelMax());
+ con.setMaximumFrameSize(params.getFrameMax());
+ initHeartbeats((int) params.getHeartbeat());
+ }
+}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=684016&r1=684015&r2=684016&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Aug 8 10:08:37 2008
@@ -47,11 +47,13 @@
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.*;
+import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -253,6 +255,19 @@
}
/**
+ * Called when we want to create a new IoTransport session
+ * @param brokerDetail
+ */
+ public void createIoTransportSession(BrokerDetails brokerDetail)
+ {
+ _protocolSession = new AMQProtocolSession(this, _connection);
+ _stateManager.setProtocolSession(_protocolSession);
+ IoTransport.connect_0_9(getProtocolSession(),
+ brokerDetail.getHost(), brokerDetail.getPort());
+ _protocolSession.init();
+ }
+
+ /**
* Called when the network connection is closed. This can happen, either because the client explicitly requested
* that the connection be closed, in which case nothing is done, or because the connection died. In the case
* where the connection died, an attempt to failover automatically to a new connection may be started. The failover
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=684016&r1=684015&r2=684016&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Aug 8 10:08:37 2008
@@ -44,6 +44,7 @@
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.network.io.IoSender;
import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
/**
@@ -99,7 +100,8 @@
private MethodDispatcher _methodDispatcher;
- private final AMQConnection _connection;
+ protected final AMQConnection _connection;
+
private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
@@ -118,11 +120,20 @@
}
+ public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
+ {
+ _protocolHandler = protocolHandler;
+ _minaProtocolSession = null;
+ _protocolVersion = connection.getProtocolVersion();
+ _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
+ this);
+ _connection = connection;
+ }
+
public void init()
{
// start the process of setting up the connection. This is the first place that
// data is written to the server.
-
_minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
}
@@ -171,7 +182,7 @@
public SaslClient getSaslClient()
{
- return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);
+ return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);
}
/**
@@ -422,6 +433,7 @@
}
_logger.debug("Closing protocol session");
+
final CloseFuture future = _minaProtocolSession.close();
// There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED
@@ -430,7 +442,6 @@
// error now shouldn't matter.
_protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
-
future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
}
@@ -535,4 +546,9 @@
{
_protocolHandler.propagateExceptionToAllWaiters(error);
}
+
+ public void setSender(IoSender sender)
+ {
+ // No-op, interface munging
+ }
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java?rev=684016&r1=684015&r2=684016&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java Fri Aug 8 10:08:37 2008
@@ -50,4 +50,14 @@
return buffer;
}
+ public java.nio.ByteBuffer toNioByteBuffer()
+ {
+ final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize());
+
+ ByteBuffer buf = ByteBuffer.wrap(buffer);
+ writePayload(buf);
+ buffer.flip();
+ return buffer;
+ }
+
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java?rev=684016&r1=684015&r2=684016&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java Fri Aug 8 10:08:37 2008
@@ -21,6 +21,7 @@
package org.apache.qpid.protocol;
import org.apache.qpid.framing.*;
+import org.apache.qpid.transport.network.io.IoSender;
import org.apache.qpid.AMQException;
/**
@@ -54,4 +55,7 @@
public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException;
+ public void setSender(IoSender sender);
+ public void init();
+
}
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java?rev=684016&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java Fri Aug 8 10:08:37 2008
@@ -0,0 +1,109 @@
+package org.apache.qpid.transport.network.io;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyFactory;
+import org.apache.qpid.framing.BodyFactory;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentBodyFactory;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentHeaderBodyFactory;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.HeartbeatBodyFactory;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.Receiver;
+
+public class InputHandler_0_9 implements Receiver<ByteBuffer>
+{
+
+ private AMQVersionAwareProtocolSession _session;
+ private MethodRegistry _registry;
+ private BodyFactory bodyFactory;
+ private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
+
+ static
+ {
+ _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance();
+ _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance();
+ _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory();
+ }
+
+ public InputHandler_0_9(AMQVersionAwareProtocolSession session)
+ {
+ _session = session;
+ _registry = _session.getMethodRegistry();
+ }
+
+ public void closed()
+ {
+ // AS FIXME: implement
+ }
+
+ public void exception(Throwable t)
+ {
+ // TODO: propogate exception to things
+ t.printStackTrace();
+ }
+
+ public void received(ByteBuffer buf)
+ {
+ org.apache.mina.common.ByteBuffer in = org.apache.mina.common.ByteBuffer.wrap(buf);
+ try
+ {
+ final byte type = in.get();
+ if (type == AMQMethodBody.TYPE)
+ {
+ bodyFactory = new AMQMethodBodyFactory(_session);
+ }
+ else
+ {
+ bodyFactory = _bodiesSupported[type];
+ }
+
+ if (bodyFactory == null)
+ {
+ throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
+ }
+
+ final int channel = in.getUnsignedShort();
+ final long bodySize = in.getUnsignedInt();
+
+ // bodySize can be zero
+ if ((channel < 0) || (bodySize < 0))
+ {
+ throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel
+ + " bodySize = " + bodySize, null);
+ }
+
+ AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
+
+ byte marker = in.get();
+ if ((marker & 0xFF) != 0xCE)
+ {
+ throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
+ + " type=" + type, null);
+ }
+
+ try
+ {
+ frame.getBodyFrame().handle(frame.getChannel(), _session);
+ }
+ catch (AMQException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=684016&r1=684015&r2=684016&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Fri Aug 8 10:08:37 2008
@@ -31,7 +31,7 @@
import static org.apache.qpid.transport.util.Functions.*;
-final class IoSender extends Thread implements Sender<ByteBuffer>
+public final class IoSender extends Thread implements Sender<ByteBuffer>
{
private static final Logger log = Logger.get(IoSender.class);
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java?rev=684016&r1=684015&r2=684016&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java Fri Aug 8 10:08:37 2008
@@ -26,6 +26,7 @@
import java.net.SocketException;
import java.nio.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.Receiver;
@@ -82,6 +83,19 @@
private Connection connectInternal(String host, int port,
ConnectionDelegate delegate)
{
+ createSocket(host, port);
+
+ sender = new IoSender(this, 2*writeBufferSize, timeout);
+ Connection conn = new Connection
+ (new Disassembler(sender, 64*1024 - 1), delegate);
+ receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)),
+ 2*readBufferSize, timeout);
+
+ return conn;
+ }
+
+ private void createSocket(String host, int port)
+ {
try
{
InetAddress address = InetAddress.getByName(host);
@@ -108,14 +122,6 @@
{
throw new TransportException("Error connecting to broker", e);
}
-
- sender = new IoSender(this, 2*writeBufferSize, timeout);
- Connection conn = new Connection
- (new Disassembler(sender, 64*1024 - 1), delegate);
- receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)),
- 2*readBufferSize, timeout);
-
- return conn;
}
IoSender getSender()
@@ -133,4 +139,21 @@
return socket;
}
+ public static void connect_0_9 (AMQVersionAwareProtocolSession session, String host, int port)
+ {
+ IoTransport handler = new IoTransport();
+ handler.connectInternal_0_9(session, host, port);
+ }
+
+ public void connectInternal_0_9(AMQVersionAwareProtocolSession session, String host, int port)
+ {
+
+ createSocket(host, port);
+
+ sender = new IoSender(this, 2*writeBufferSize, timeout);
+ receiver = new IoReceiver(this, new InputHandler_0_9(session),
+ 2*readBufferSize, timeout);
+ session.setSender(sender);
+ }
+
}
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java?rev=684016&r1=684015&r2=684016&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java Fri Aug 8 10:08:37 2008
@@ -29,6 +29,7 @@
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.transport.network.io.IoSender;
import javax.security.sasl.SaslServer;
import java.util.HashMap;
@@ -246,4 +247,16 @@
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+
+ public void setSender(IoSender sender)
+ {
+ // FIXME AS TODO
+
+ }
+
+ public void init()
+ {
+ // TODO Auto-generated method stub
+
+ }
}